package com.k2data.processors;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by luoqifei on 17-1-13.
 */
public class AnomalyProcessor implements Processor<String, String> {
    private static Logger logger = LoggerFactory.getLogger(AnomalyProcessor.class);
    private ProcessorContext context;
    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
    }

    @Override
    public void process(String s, String s2) {
        System.out.println("anomaly key is:"+s+" . value is:"+s2);
        this.context.forward(s,s2);
    }

    @Override
    public void punctuate(long l) {

    }

    @Override
    public void close() {

    }
}
